Create initial skeleton for an EventHubs stress test.#4250
Create initial skeleton for an EventHubs stress test.#4250LarryOsterman wants to merge 4 commits intoAzure:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an initial, custom (non-harness) Event Hubs stress-test runner under azure_messaging_eventhubs/tests, exposing two scenarios via a clap-driven CLI to support ad-hoc durability/throughput testing.
Changes:
- Introduces a
stress_testsintegration-test entrypoint that builds a subcommand CLI and runs one/all scenarios. - Adds two initial scenarios: a bounded publish/read test and a long-running continuous send/receive loop with periodic reporting.
- Updates dev-dependencies to support the CLI and
.env-based configuration.
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests.rs | New stress-test runner with clap subcommands and tokio main |
| sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/basic_publish_read_test.rs | Adds bounded publish/read scenario implementation + metrics/validation |
| sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/continuous_send_receive_stress.rs | Adds continuous send/receive durability loop + periodic reporting |
| sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/README.md | Documents scenarios, CLI usage, and environment variables |
| sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml | Adds dev-deps for clap + dotenvy to support the stress test runner |
| Cargo.lock | Locks new dev-dependency additions |
| // Start reading from the beginning | ||
| let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID | ||
| let receiver = consumer | ||
| .open_receiver_on_partition( | ||
| partition_id.clone(), | ||
| Some(OpenReceiverOptions { | ||
| start_position: Some(StartPosition { | ||
| location: StartLocation::Earliest, |
There was a problem hiding this comment.
Consumers start at StartLocation::Earliest, which will read historical events from the partition and can satisfy expected_events without ever consuming events from this test run. For a bounded publish/read validation, start from Latest (or from an enqueued time captured just before producers start) so the consumed set corresponds to the events published by this run.
| // Start reading from the beginning | |
| let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID | |
| let receiver = consumer | |
| .open_receiver_on_partition( | |
| partition_id.clone(), | |
| Some(OpenReceiverOptions { | |
| start_position: Some(StartPosition { | |
| location: StartLocation::Earliest, | |
| // Start reading from the latest position so this test only counts events from the current run | |
| let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID | |
| let receiver = consumer | |
| .open_receiver_on_partition( | |
| partition_id.clone(), | |
| Some(OpenReceiverOptions { | |
| start_position: Some(StartPosition { | |
| location: StartLocation::Latest, |
| let expected_events = config.event_count; | ||
| let consumed_events = consumed_events.clone(); | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
| run_consumer_task( | ||
| host, | ||
| eventhub, | ||
| credential, | ||
| consumer_id, | ||
| expected_events, | ||
| consumed_events, | ||
| ) |
There was a problem hiding this comment.
expected_events is set to config.event_count for every consumer task. With multiple consumers, that makes the aggregate expected count event_count * consumer_count and can lead to unnecessary timeouts/hanging behavior (especially since consumers may share partitions). Consider dividing expected events per consumer based on partition assignment, or coordinating completion based on a shared consumed counter/map.
| #[tokio::main] | ||
| async fn main() { | ||
| // Initialize tracing for test output | ||
| tracing_subscriber::fmt() |
There was a problem hiding this comment.
tests/stress_tests.rs defines its own main and is intended to be a custom-harness test binary, but there is no [[test]] entry in this crate’s Cargo.toml with harness = false for the stress_tests target. As-is, this integration test will fail to compile due to the default libtest harness generating its own main. Add a [[test]] name = "stress_tests" section with harness = false (and keep the file as the entrypoint).
| // No subcommand: run all with default args | ||
| println!("Running all stress tests with default settings..."); | ||
| for spec in registry() { | ||
| let default_matches = (spec.configure)(Command::new(spec.name)) | ||
| .no_binary_name(true) | ||
| .get_matches_from(Vec::<&str>::new()); | ||
|
|
||
| if let Err(e) = (spec.run)(default_matches).await { | ||
| eprintln!("{} FAILED: {}", spec.name, e); | ||
| process::exit(1); | ||
| } | ||
| } |
There was a problem hiding this comment.
Even with a harness = false test target, cargo test will execute this binary by default, but it requires live env vars and (by default) runs a 72h scenario. This is very likely to break local cargo test runs and CI. Consider gating the test target behind required-features (e.g., required-features = ["stress-tests"]) and adding a non-default feature, or moving this CLI into examples/ or a separate crate/binary so it never runs as part of normal tests.
| // No subcommand: run all with default args | |
| println!("Running all stress tests with default settings..."); | |
| for spec in registry() { | |
| let default_matches = (spec.configure)(Command::new(spec.name)) | |
| .no_binary_name(true) | |
| .get_matches_from(Vec::<&str>::new()); | |
| if let Err(e) = (spec.run)(default_matches).await { | |
| eprintln!("{} FAILED: {}", spec.name, e); | |
| process::exit(1); | |
| } | |
| } | |
| // No subcommand: do not run stress tests implicitly. | |
| let mut cmd = build_cli(); | |
| let _ = cmd.print_help(); | |
| eprintln!(); | |
| eprintln!(); | |
| eprintln!( | |
| "No stress test selected. Specify a subcommand to run a stress test explicitly." | |
| ); |
| // Start reading from the beginning | ||
| let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID |
There was a problem hiding this comment.
The consumer selects a partition with consumer_id % 2, assuming exactly two partitions. This will fail for Event Hubs with a different partition count (invalid partition id) and will never read from partitions beyond 0/1. Query the hub’s partition_ids (as the continuous test does) and assign consumers across the real set of partitions.
| // Start reading from the beginning | |
| let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID | |
| let eventhub_properties = consumer | |
| .get_eventhub_properties() | |
| .await | |
| .map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)?; | |
| if eventhub_properties.partition_ids.is_empty() { | |
| return Err(Box::new(std::io::Error::new( | |
| std::io::ErrorKind::Other, | |
| "Event Hub has no partitions", | |
| ))); | |
| } | |
| // Start reading from the beginning on a real partition from the Event Hub. | |
| let partition_id = | |
| eventhub_properties.partition_ids[consumer_id % eventhub_properties.partition_ids.len()] | |
| .clone(); |
| azure_messaging_eventhubs = { path = ".", features = [ | ||
| "in_memory_checkpoint_store", | ||
| ] } | ||
| clap.workspace = true | ||
| criterion.workspace = true | ||
| dotenvy = "0.15.7" | ||
| fe2o3-amqp = { workspace = true, features = ["tracing"] } | ||
| tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } | ||
| tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } |
There was a problem hiding this comment.
This change adds clap/dotenvy dev-dependencies for the new stress-test CLI, but the crate is still missing a [[test]] section to declare the stress_tests integration test as harness = false (and ideally required-features = [...] to prevent it from running by default). Without that, the new tests/stress_tests.rs (which defines main) won’t compile as an integration test, and/or will run unexpectedly in CI.
| impl Default for TestConfig { | ||
| fn default() -> Self { | ||
| Self { | ||
| event_count: 1000, | ||
| producer_count: 2, | ||
| consumer_count: 2, | ||
| test_timeout: Duration::from_secs(300), // 5 minutes | ||
| event_size: 1024, // 1KB | ||
| batch_size: 100, | ||
| } |
There was a problem hiding this comment.
TestConfig::default() values (event_count=1000, producer/consumer_count=2, etc.) don’t match the clap defaults / run_standalone fallbacks (100 events, 1 producer, 1 consumer). This makes the “default config” ambiguous and can lead to confusing behavior depending on whether callers use TestConfig::default() vs CLI. Consider aligning these defaults (either update Default or the clap .default_value(...) + unwrap_or(...) values).
| let config = TestConfig { | ||
| event_count: *matches.get_one::<u32>("events").unwrap_or(&100) as usize, | ||
| producer_count: *matches.get_one::<u32>("producers").unwrap_or(&1) as usize, | ||
| consumer_count: *matches.get_one::<u32>("consumers").unwrap_or(&1) as usize, | ||
| test_timeout: std::time::Duration::from_secs( | ||
| *matches.get_one::<u64>("timeout").unwrap_or(&120), | ||
| ), | ||
| event_size: *matches.get_one::<u32>("event-size").unwrap_or(&512) as usize, | ||
| batch_size: *matches.get_one::<u32>("batch-size").unwrap_or(&10) as usize, |
There was a problem hiding this comment.
producer_count and batch_size are taken from CLI without validation. producer_count == 0 will panic (division by zero in start_producers), and batch_size == 0 will cause an infinite loop in run_producer_task (batch_end never advances). Add clap validation (e.g., value_parser range >=1) and/or explicit checks that return a user-friendly error before running.
| let config = TestConfig { | |
| event_count: *matches.get_one::<u32>("events").unwrap_or(&100) as usize, | |
| producer_count: *matches.get_one::<u32>("producers").unwrap_or(&1) as usize, | |
| consumer_count: *matches.get_one::<u32>("consumers").unwrap_or(&1) as usize, | |
| test_timeout: std::time::Duration::from_secs( | |
| *matches.get_one::<u64>("timeout").unwrap_or(&120), | |
| ), | |
| event_size: *matches.get_one::<u32>("event-size").unwrap_or(&512) as usize, | |
| batch_size: *matches.get_one::<u32>("batch-size").unwrap_or(&10) as usize, | |
| let event_count = *matches.get_one::<u32>("events").unwrap_or(&100) as usize; | |
| let producer_count = *matches.get_one::<u32>("producers").unwrap_or(&1) as usize; | |
| let consumer_count = *matches.get_one::<u32>("consumers").unwrap_or(&1) as usize; | |
| let test_timeout = | |
| std::time::Duration::from_secs(*matches.get_one::<u64>("timeout").unwrap_or(&120)); | |
| let event_size = *matches.get_one::<u32>("event-size").unwrap_or(&512) as usize; | |
| let batch_size = *matches.get_one::<u32>("batch-size").unwrap_or(&10) as usize; | |
| if producer_count == 0 { | |
| return Err("`--producers` must be greater than 0".into()); | |
| } | |
| if batch_size == 0 { | |
| return Err("`--batch-size` must be greater than 0".into()); | |
| } | |
| let config = TestConfig { | |
| event_count, | |
| producer_count, | |
| consumer_count, | |
| test_timeout, | |
| event_size, | |
| batch_size, |
| let published_events = Arc::new(Mutex::new(HashMap::<String, String>::new())); | ||
| let consumed_events = Arc::new(Mutex::new(HashMap::<String, String>::new())); | ||
|
|
There was a problem hiding this comment.
This test uses std::sync::Mutex for published_events / consumed_events and locks it inside async tasks. In a Tokio runtime this can block the executor thread and distort stress test results (and in worst cases lead to starvation). Prefer tokio::sync::Mutex (or a concurrent map like DashMap) for data shared across async tasks.
| info!( | ||
| "Discovered {} partitions: {:?} (consumer_count hint: {})", | ||
| partition_ids.len(), | ||
| partition_ids, | ||
| config.consumer_count, |
There was a problem hiding this comment.
consumer_count is accepted via CLI and stored in TestConfig, but the implementation spawns one consumer per partition and only uses consumer_count for logging. This makes the CLI option misleading. Either honor consumer_count (e.g., spawn that many consumers and assign partitions) or remove the option/config field to avoid confusion.
| info!( | |
| "Discovered {} partitions: {:?} (consumer_count hint: {})", | |
| partition_ids.len(), | |
| partition_ids, | |
| config.consumer_count, | |
| let actual_consumer_count = partition_ids.len(); | |
| if config.consumer_count != actual_consumer_count { | |
| return Err(Box::new(std::io::Error::other(format!( | |
| "consumer_count ({}) must match the Event Hub partition count ({}) for this test, which creates one consumer per partition", | |
| config.consumer_count, actual_consumer_count | |
| )))); | |
| } | |
| info!( | |
| "Discovered {} partitions: {:?}; spawning {} consumers", | |
| actual_consumer_count, | |
| partition_ids, | |
| actual_consumer_count, |
| batch.try_add_event_data(event, None)?; | ||
|
|
||
| // Record it as expected now that it's in a batch | ||
| let mut missing = state.missing_events.lock().await; | ||
| missing.insert(key, expected); |
There was a problem hiding this comment.
When rolling over to a new batch, the result of batch.try_add_event_data(event, None)? is ignored. If an individual event can’t fit into an empty batch, try_add_event_data returns false and the event is silently not sent, but it is still inserted into missing_events and will be reported as lost. Handle the false case explicitly (return error / increment failure metric) before recording it as expected.
| batch.try_add_event_data(event, None)?; | |
| // Record it as expected now that it's in a batch | |
| let mut missing = state.missing_events.lock().await; | |
| missing.insert(key, expected); | |
| match batch.try_add_event_data(event, None)? { | |
| true => { | |
| // Record it as expected now that it's in a batch | |
| let mut missing = state.missing_events.lock().await; | |
| missing.insert(key, expected); | |
| } | |
| false => { | |
| return Err(std::io::Error::other(format!( | |
| "event does not fit into an empty batch: batch_index={}, event_index={}, key={}", | |
| expected.batch_index, expected.index, key | |
| )) | |
| .into()); | |
| } | |
| } |
| let end_instant = Instant::now() + config.duration; | ||
| let sender = tokio::spawn(send_loop( | ||
| producer.clone(), | ||
| state.clone(), | ||
| config.clone(), | ||
| end_instant, | ||
| )); | ||
|
|
||
| let mut receiver_handles = Vec::new(); | ||
| for partition_id in partition_ids { | ||
| let state = state.clone(); | ||
| let host = host.clone(); | ||
| let eventhub = eventhub.clone(); | ||
| let credential = credential.clone(); | ||
|
|
||
| receiver_handles.push(tokio::spawn(async move { | ||
| receive_loop(host, eventhub, credential, partition_id, state, end_instant).await | ||
| })); | ||
| } |
There was a problem hiding this comment.
send_loop is spawned before receivers are opened, but receivers default to StartLocation::Latest. Any events sent before a given partition receiver attaches will never be observed, yet they’ve already been added to missing_events and will be counted as lost/corrupted. Start receivers before the sender and/or use an EnqueuedTime start position captured at test start so the receiver includes all events produced by this run.
| match batch.try_add_event_data(event, None)? { | ||
| true => { | ||
| // Event was successfully added, record it as expected | ||
| let mut missing = state.missing_events.lock().await; | ||
| missing.insert(key, expected); | ||
| } | ||
| false => { | ||
| // Batch is full, send it and create a new one | ||
| producer.send_batch(batch, None).await?; | ||
|
|
There was a problem hiding this comment.
missing_events tracks every sent event body until it’s received, and entries are never aged out. With a 72h default duration (and if receivers fall behind or errors occur), this map can grow without bound and cause excessive memory usage/OOM. Consider bounding it (e.g., per-partition watermarks, sampling, periodic pruning) or switching to metrics that don’t require storing every outstanding event key.
| // No subcommand: run all with default args | ||
| println!("Running all stress tests with default settings..."); | ||
| for spec in registry() { | ||
| let default_matches = (spec.configure)(Command::new(spec.name)) | ||
| .no_binary_name(true) | ||
| .get_matches_from(Vec::<&str>::new()); | ||
|
|
||
| if let Err(e) = (spec.run)(default_matches).await { | ||
| eprintln!("{} FAILED: {}", spec.name, e); | ||
| process::exit(1); | ||
| } | ||
| } |
There was a problem hiding this comment.
When no subcommand is provided, main runs all stress tests with default args. With continuous_send_receive_stress defaulting to 72 hours, cargo test --test stress_tests can unexpectedly run for days. Consider requiring an explicit subcommand (or using a much shorter default duration when invoked via the “run all” path) to avoid accidental long-running executions.
| match batch.try_add_event_data(event, None)? { | ||
| true => { | ||
| // Event was successfully added, record it as expected | ||
| let mut missing = state.missing_events.lock().await; | ||
| missing.insert(key, expected); | ||
| } | ||
| false => { | ||
| // Batch is full, send it and create a new one | ||
| producer.send_batch(batch, None).await?; | ||
|
|
There was a problem hiding this comment.
If try_add_event_data(...) returns false on an empty batch (which happens when a single event exceeds the max batch size), this code goes into the “batch is full” path and calls send_batch(batch, ...) on an empty EventDataBatch. EventDataBatch::get_messages() expects a batch envelope and will panic for empty batches. Detect the “event too large for an empty batch” case and return an error instead of trying to send the empty batch.
| let config = ContinuousStressConfig { | ||
| duration: Duration::from_secs(duration_hours * 60 * 60), | ||
| min_batch_size: min_batch_size.min(max_batch_size), | ||
| max_batch_size: max_batch_size.max(min_batch_size), | ||
| min_delay_secs: min_delay.min(max_delay), | ||
| max_delay_secs: max_delay.max(min_delay), | ||
| }; |
There was a problem hiding this comment.
min_batch_size can be set to 0 via CLI (u32 parser allows it). That can lead to batch_size == 0, and the loop will attempt to send_batch with an empty EventDataBatch (panic) and will also skew metrics. Consider enforcing min-batch >= 1/max-batch >= 1 at the clap layer (or return an error when batch_size resolves to 0).
Not an official test but capturing work from earlier this year and moving it into main.